package org.zjvis.datascience.common.util;

import com.alibaba.fastjson.JSONObject;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import javax.annotation.PostConstruct;
import javax.net.ssl.SSLContext;

import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import org.zjvis.datascience.common.model.ApiResult;
import org.zjvis.datascience.common.model.ApiResultCode;
import org.zjvis.datascience.common.vo.JobStatusVO;

/**
 * @description rest服务调用工具类
 * @date 2021-11-22
 */
@Component
public class RestTemplateUtil {

    private final static Logger logger = LoggerFactory.getLogger(RestTemplateUtil.class);

    private RestTemplate restTemplate;

    private RestTemplate restTemplateForAudit;

    private RestTemplate restTemplateForTimeout;

    private RestTemplate restTemplateForML;

    private RestTemplate restTemplateForFlask;

    @Value("${restful.jobServer.address:#{null}}")
    private String jobServer;

    @Value("${restful.flaskServer.address}")
    private String flaskServer;


    @Value("${server.port}")
    private int port;

    private boolean localFlag = false;

    public String getFlaskServer(){
        return this.flaskServer;
    }

    @PostConstruct
    public void init() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
        SSLContext sslContext = org.apache.http.ssl.SSLContexts.custom()
                .loadTrustMaterial(null, new TrustSelfSignedStrategy())
                .build();

        SSLConnectionSocketFactory csf = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);

        CloseableHttpClient httpClient = HttpClients.custom()
                .setSSLSocketFactory(csf)
                .setMaxConnTotal(200)
                .setMaxConnPerRoute(100)
                .build();

        HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
        requestFactory.setHttpClient(httpClient);
        
        this.restTemplate = new RestTemplate(requestFactory);

        initAuditRestTemplate();

        initTimeoutTemplate(15 * 1000);

        initMLRestTemplate();

        initFlaskRestTemplate();

        setLocalJobServer();
    }

    private void setLocalJobServer() {
        if (null == this.jobServer || this.jobServer.isEmpty()) {
            this.jobServer = "https://localhost:" + port;
            localFlag = true;
        }
        logger.info("using jobserver at {}", this.jobServer);
    }

    private String getJobServer() {
        return this.jobServer;
    }

    public boolean getLocalFlag(){
        return this.localFlag;
    }

    private void initAuditRestTemplate() {
        HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();
        clientHttpRequestFactory.setConnectionRequestTimeout(2000);
        clientHttpRequestFactory.setReadTimeout(2000);
        clientHttpRequestFactory.setConnectTimeout(2000);
        restTemplateForAudit = new RestTemplate(clientHttpRequestFactory);
    }

    private void initMLRestTemplate() {
        HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();
        clientHttpRequestFactory.setConnectionRequestTimeout(20000);
        clientHttpRequestFactory.setReadTimeout(20000);
        clientHttpRequestFactory.setConnectTimeout(20000);
        restTemplateForML = new RestTemplate(clientHttpRequestFactory);
    }

    private void initFlaskRestTemplate() {
        HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();
        clientHttpRequestFactory.setConnectionRequestTimeout(20000);
        clientHttpRequestFactory.setReadTimeout(20000);
        clientHttpRequestFactory.setConnectTimeout(20000);
        restTemplateForFlask = new RestTemplate(clientHttpRequestFactory);
    }

    private void initTimeoutTemplate(Integer readTimeout) {
        HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();
        clientHttpRequestFactory.setConnectTimeout(2000);
        clientHttpRequestFactory.setReadTimeout(readTimeout);
        restTemplateForTimeout = new RestTemplate(clientHttpRequestFactory);

    }

    /**
     * 调用jobserver提交任务
     *
     * @param appArgs 命令行参数
     * @return applicationId, 为空就是提交失败
     */
    public String submitJob(String appArgs) {
        String url = getJobServer() + "/job/submitJob?appArgs=" + appArgs;
        JSONObject reqBody = new JSONObject();
        reqBody.put("appArgs", appArgs);
        HttpEntity<JSONObject> req = new HttpEntity<>(reqBody);
        ResponseEntity<ApiResult> entity = restTemplate
                .exchange(url, HttpMethod.POST, req, ApiResult.class);
        ApiResult body = entity.getBody();
        return (String) body.getResult();
    }

    /**
     * 调用jobserver提交任务
     *
     * @param appArgs 命令行参数
     * @return applicationId, 为空就是提交失败
     */
    public ApiResult submitModelJob(String appArgs) {
        String url = getJobServer() + "/job/submitMLJob?appArgs=" + appArgs;

        JSONObject reqBody = new JSONObject();
        reqBody.put("appArgs", appArgs);
        HttpEntity<JSONObject> req = new HttpEntity<>(reqBody);
        ResponseEntity<ApiResult> entity = restTemplate
                .exchange(url, HttpMethod.POST, req, ApiResult.class);
        return entity.getBody();
    }


    /**
     * @param appArgs
     * @param appResourcePath
     * @return
     */
    public String submitPySparkJob(String appArgs, String appResourcePath) throws IOException {
        String url = String.format("%s/job/submitPySparkJob", getJobServer());
        JSONObject reqBody = new JSONObject();
        reqBody.put("appArgs", appArgs);
        reqBody.put("appResourcePath", appResourcePath);
        HttpEntity<JSONObject> req = new HttpEntity<>(reqBody);

        ResponseEntity<ApiResult> entity = restTemplateForTimeout
                .exchange(url, HttpMethod.POST, req, ApiResult.class);

        ApiResult body = entity.getBody();
        return (String) body.getResult();
    }


    /**
     * @param params
     * @return
     */
    public String submitMLFlask(JSONObject params) throws IOException {
        String aPath = params.getString("aPath");
        String url = String.format("%s/%s", flaskServer, aPath);
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
        MultiValueMap<String, String> map = new LinkedMultiValueMap<String, String>();
        for (String keyStr : params.keySet()) {
            if (!keyStr.equals("aPath")) {
                map.add(keyStr, params.getString(keyStr));
            }
        }
        HttpEntity<MultiValueMap<String, String>> request =
                new HttpEntity<MultiValueMap<String, String>>(map, headers);
        String result = restTemplateForFlask.postForObject(url, request, String.class);
        return result;
    }
    /**
     * @param params
     * @return
     */
    public String submitFlaskJob(JSONObject params) throws IOException {
        String aPath = params.getString("apiPath");
        String url = String.format("%s/%s", flaskServer, aPath);
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
        MultiValueMap<String, String> map = new LinkedMultiValueMap<String, String>();
        for (String keyStr : params.keySet()) {
            if (!keyStr.equals("apiPath")) {
                map.add(keyStr, params.getString(keyStr));
            }
        }
        HttpEntity<MultiValueMap<String, String>> request =
                new HttpEntity<MultiValueMap<String, String>>(map, headers);
        String result = restTemplateForFlask.postForObject(url, request, String.class);
        return result;
    }


    /**
     * 写pyspark脚本到指定hdfs路径
     *
     * @param appResourcePath 路径名
     * @param scriptBody      脚本内容
     * @return
     */
    public boolean writePyScriptToHDFS(String appResourcePath, String scriptBody) {
        String url = String.format("%s/job/writePyScriptToHDFS", getJobServer());
        JSONObject reqBody = new JSONObject();
        reqBody.put("scriptBody", scriptBody);
        reqBody.put("appResourcePath", appResourcePath);
        HttpEntity<JSONObject> req = new HttpEntity<>(reqBody);
        ResponseEntity<ApiResult> entity = restTemplate
                .exchange(url, HttpMethod.POST, req, ApiResult.class);
        ApiResult body = entity.getBody();
        return body.getCode() == ApiResultCode.SUCCESS.getCode();
    }

    /**
     * 查询spark任务的执行状态
     *
     * @param applicationId
     * @return
     */
    public JobStatusVO queryJobStatus(String applicationId, String directory) {
        String url = getJobServer() + "/job/queryJobStatus";
        JSONObject json = new JSONObject();
        json.put("applicationId", applicationId);
        HttpEntity<JSONObject> req = new HttpEntity<>(json);
        ResponseEntity<ApiResult> response = restTemplate
                .exchange(url, HttpMethod.POST, req, ApiResult.class);
        ApiResult<String> body = response.getBody();
        if (body.getCode() != ApiResultCode.SUCCESS.getCode()) {
            return new JobStatusVO();
        }
        String jsonString = body.getResult();
        return JSONObject.parseObject(jsonString, JobStatusVO.class);
    }

    /**
     * 杀死提交的spark任务
     *
     * @param applicationId
     * @return
     */
    public boolean killJob(String applicationId) {
        String url = getJobServer() + "/job/killJob";
        JobStatusVO reqBody = new JobStatusVO();
        reqBody.setId(applicationId);
        HttpEntity<JobStatusVO> req = new HttpEntity<>(reqBody);
        ResponseEntity<ApiResult> response = restTemplate
                .exchange(url, HttpMethod.POST, req, ApiResult.class);
        ApiResult body = response.getBody();
        if (null == body || body.getCode() != ApiResultCode.SUCCESS.getCode()) {
            return false;
        }
        return true;
    }

    /**
     * 调jobServer提交任务
     *
     * @param appArgs
     * @return
     */
    public String runJob(String appArgs) {
        String url = getJobServer() + "/job/runJob";
        JSONObject reqBody = new JSONObject();
        reqBody.put("appArgs", appArgs);
        HttpEntity<JSONObject> req = new HttpEntity<>(reqBody);
        logger.info("RestTemplateUtil.runJob(), before send request, req={}, timestamp={}", req,
                System.currentTimeMillis());
        ResponseEntity<ApiResult> response;
        try {
            //大概耗时2分半  150s 才能拿到结果
            response = restTemplate.exchange(url, HttpMethod.POST, req, ApiResult.class);
        } catch (RestClientException e) {
            e.printStackTrace();
            return "";
        }
        logger.info("RestTemplateUtil.runJob(), received response, response={}, timestamp={}",
                response, System.currentTimeMillis());
        ApiResult<String> body = response.getBody();
        if (body.getCode() != ApiResultCode.SUCCESS.getCode()) {
            return "";
        }
        return body.getResult();
    }

//  /**
//   * 发送dataX配置文件到dataX服务器
//   */
//  public String sendDataXConfig(String fileName, String json) {
//    String url = dataXServer + "/upload";
//    DataXDTO dataXBean = new DataXDTO(fileName, json);
//    HttpEntity<DataXDTO> req = new HttpEntity<>(dataXBean);
//    ResponseEntity<String> response = restTemplate
//        .exchange(url, HttpMethod.POST, req, String.class);
//    return response.getBody();
//  }
//
//  /**
//   * 执行dataX任务
//   */
//  public String executeDataXJob(String fileName) {
//    String url = dataXServer + "/execute?fileName=" + fileName;
//    HttpEntity<String> req = new HttpEntity<>(fileName);
//    ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.GET, req, String.class);
//    return response.getBody();
//  }
}
